Expand description
Performant, portable, structured concurrency operations for async Rust. It works with any runtime, does not erase lifetimes, always handles cancellation, and always returns output to the caller.
futures-concurrency
provides concurrency operations for both groups of futures
and streams. Both for bounded and unbounded sets of futures and streams. In both
cases performance should be on par with, if not exceed conventional executor
implementations.
§Examples
Await multiple futures of different types
use futures_concurrency::prelude::*;
use std::future;
let a = future::ready(1u8);
let b = future::ready("hello");
let c = future::ready(3u16);
assert_eq!((a, b, c).join().await, (1, "hello", 3));
Concurrently process items in a collection
use futures_concurrency::prelude::*;
let v: Vec<_> = vec!["chashu", "nori"]
.into_co_stream()
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;
assert_eq!(v, &["hello chashu", "hello nori"]);
Access stack data outside the futures’ scope
Adapted from std::thread::scope
.
use futures_concurrency::prelude::*;
let mut container = vec![1, 2, 3];
let mut num = 0;
let a = async {
println!("hello from the first future");
dbg!(&container);
};
let b = async {
println!("hello from the second future");
num += container[0] + container[2];
};
println!("hello from the main future");
let _ = (a, b).join().await;
container.push(4);
assert_eq!(num, container.len());
§Operations
§Futures
For futures which return a regular type T
only the join
and race
operations are available. join
waits for all futures to complete, while race
will wait for the first future to complete. However for futures which return a
Try<Output = T>
two additional operations are available. The following table
describes the behavior of concurrency operations for fallible futures:
Wait for all outputs | Wait for first output | |
---|---|---|
Continue on error | Future::join | Future::race_ok |
Short-circuit on error | Future::try_join | Future::race |
The following futures implementations are provided by futures-concurrency
:
FutureGroup
: A growable group of futures which operate as a single unit.tuple
:join
,try_join
,race
,race_ok
array
:join
,try_join
,race
,race_ok
Vec
:join
,try_join
,race
,race_ok
§Streams
Streams yield outputs one-by-one, which means that deciding to stop iterating is the same for fallible and infallible streams. The operations provided for streams can be categorized based on whether their inputs can be concurrently evaluated, and whether their outputs can be concurrently processed.
Specifically in the case of merge
, it takes N
streams in, and yields items
one-by-one as soon as any are available. This enables the output of individual
streams to be concurrently processed by further operations later on.
Sequential output processing | Concurrent output processing | |
---|---|---|
Sequential input evaluation | Stream::chain | not yet available ‡ |
Concurrent input evaluation | Stream::zip | Stream::merge |
‡: This could be addressed by a hypothetical Stream::unzip
operation,
however because we aspire for semantic compatibility with std::iter::Iterator
in our operations, the path to adding it is currently unclear.
The following streams implementations are provided by futures-concurrency
:
StreamGroup
: A growable group of streams which operate as a single unit.ConcurrentStream
: A trait for asynchronous streams which can concurrently process items.tuple
:chain
,merge
,zip
array
:chain
,merge
,zip
Vec
:chain
,merge
,zip
§Runtime Support
futures-concurrency
does not depend on any runtime executor being present.
This enables it to work out of the box with any async runtime, including:
tokio
, async-std
, smol
, glommio
, and monoio
. It also supports
#[no_std]
environments, allowing it to be used with embedded async
runtimes such as embassy
.
§Feature Flags
The std
feature flag is enabled by default. To target alloc
or no_std
environments, you can enable the following configuration:
[dependencies]
# no_std
futures-concurrency = { version = "7.5.0", default-features = false }
# alloc
futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] }
§Further Reading
futures-concurrency
has been developed over the span of several years. It is
primarily maintained by Yosh Wuyts, a member of the Rust Async WG. You can read
more about the development and ideas behind futures-concurrency
here:
Modules§
- Helper functions and types for fixed-length arrays.
- Concurrent execution of streams
- Asynchronous basic functionality.
- The futures concurrency prelude.
- Composable asynchronous iteration.
- Parallel iterator types for vectors (
Vec<T>
)